package biz

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"github.com/gogf/gf/util/gconv"
	transportBroker "github.com/tx7do/kratos-transport/broker"
	"github.com/tx7do/kratos-transport/broker/kafka"
	v1 "kratos_kafka/api/helloworld/v1"
	"kratos_kafka/internal/utils/mq_kafka"
	"time"

	"github.com/go-kratos/kratos/v2/log"
)

func (uc *GreeterUsecase) SendMsgToKafka(ctx context.Context, req *v1.SendMsgToKafkaReq) error {
	currArticle := mq_kafka.ArticleData{
		Title:       req.Title,
		Desc:        req.Desc,
		Tags:        req.Tags,
		PublishTime: time.Now(),
	}

	body, errBody := json.Marshal(currArticle)
	if errBody != nil {
		return errBody
	}
	//  WithBatchSize: 默认100条发送一次，设置成1的话表示有一条发一条！！！
	//  WithAsync: 异步发送
	errPublish := uc.broker.Kafka.Publish(
		uc.yamlConf.MessageQueue.Kafka.ArticleTopic,
		body,
		kafka.WithBatchSize(1),
		kafka.WithAsync(true),
	)
	if errPublish != nil {
		fmt.Println("往kafka中发送数据出现错误了！！", errPublish)
	}

	//// Notice 新版本的写法：直接传结构体数据即可
	//errPublish := uc.hello_broker.Kafka.Publish(uc.yamlConf.MessageQueue.Kafka.ArticleTopic, currArticle)
	//if errPublish != nil {
	//	fmt.Println("往kafka中发送数据出现错误了！！", errPublish)
	//} else {
	//	fmt.Println("成功往kafka中写入了数据!", currArticle)
	//}

	return nil
}

// 可以做一些业务逻辑，接收从kafka中收到的数据并处理
func (uc *GreeterUsecase) AutoHandleArticleData(ctx context.Context, msg *mq_kafka.ArticleData) error {

	if msg == nil || msg.Title == "" || msg.Tags == nil || len(msg.Tags) < 1 {
		log.Warn("AutoHandleArticleData kafka中ArticleData的数据不正确！", msg.Title, msg.Tags)
		return nil
	}

	fmt.Println("从kafka中接收到正确的articleData数据: ", msg.Title, msg.Desc, msg.Tags, msg.PublishTime)

	//fmt.Println("接收与发送数据的分割线>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
	//// Notice 老版本的写法：往kafka中发数据～后面可以为每一次发送数据设置相关参数，
	//msg.Desc = "往kafka中发送数据的desc123" // 往kafka中发送的数据的desc改成不一样的
	//body, errBody := json.Marshal(msg)
	//if errBody != nil {
	//	return errBody
	//}
	////  WithBatchSize: 默认100条发送一次，设置成1的话表示有一条发一条！！！
	////  WithAsync: 异步发送
	//errPublish := uc.hello_broker.Kafka.Publish(uc.yamlConf.MessageQueue.Kafka.ArticleTopic, body, kafka.WithBatchSize(1), kafka.WithAsync(true))
	//if errPublish != nil {
	//	fmt.Println("往kafka中发送数据出现错误了！！", errPublish)
	//}
	return nil
}

func (uc *GreeterUsecase) AutoHandleArticleData2(ctx context.Context, event transportBroker.Event, msg *mq_kafka.ArticleData) error {

	if msg == nil || msg.Title == "" || msg.Tags == nil || len(msg.Tags) < 1 {
		log.Warn("AutoHandleArticleData kafka中ArticleData的数据不正确！", msg.Title, msg.Tags)
		return nil
	}

	fmt.Println("从kafka中接收到正确的articleData数据: ", msg.Title, msg.Desc, msg.Tags, msg.PublishTime)

	return nil
}

type CurrentMsg struct {
	Msg string `json:"msg"`
}

func (uc *GreeterUsecase) SendMsgToKafkaByTopicName(ctx context.Context, req *v1.SendMsgToKafkaByTopicNameReq) (*v1.Empty, error) {

	currMsg := CurrentMsg{Msg: req.GetMsg()}
	body, _ := json.Marshal(currMsg)

	// Notice article_to1 与 article_to2 的数据特殊处理一下，防止没有解析成功要消费的数据～
	if req.GetTopic() == "article_to1" || req.GetTopic() == "article_to2" {
		currMsg2 := mq_kafka.ArticleData{
			Title:       req.Msg,
			Desc:        req.Msg,
			Tags:        []string{req.Msg},
			PublishTime: time.Time{},
		}

		body, _ = json.Marshal(currMsg2)
	}

	errPublish := uc.broker.Kafka.Publish(
		req.GetTopic(),
		body,
		kafka.WithBatchSize(1),
		kafka.WithAsync(true),
	)
	if errPublish != nil {
		return nil, errors.New(fmt.Sprintf("往kafka中发送数据出错了! req: %v, err: %v", gconv.String(req), errPublish))
	}

	return &v1.Empty{}, nil
}
